/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.iotdb.tsfile.read.query.dataset;

import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;

import java.io.IOException;
import java.util.List;
import java.util.Set;

public abstract class QueryDataSet {

    protected List<Path> paths;
    protected List<TSDataType> dataTypes;

    protected int rowLimit = 0; // rowLimit > 0 means the LIMIT constraint exists
    protected int rowOffset = 0;
    protected int alreadyReturnedRowNum = 0;
    protected int fetchSize = 10000;
    protected boolean ascending;
    /*
     *  whether current data group has data for query.
     *  If not null(must be in cluster mode),
     *  we need to redirect the query to any data group which has some data to speed up query.
     */
    protected EndPoint endPoint = null;

    /**
     * if any column is null, we don't need that row
     */
    protected boolean withoutAnyNull;

    /**
     * Only if all columns are null, we don't need that row
     */
    protected boolean withoutAllNull;

    /**
     * index set that withoutNullColumns for output data columns
     */
    protected Set<Integer> withoutNullColumnsIndex;

    protected int columnNum;

    /**
     * For redirect query. Need keep consistent with EndPoint in rpc.thrift.
     */
    public static class EndPoint {
        private String ip = null;
        private int port = 0;

        public EndPoint(String ip, int port) {
            this.ip = ip;
            this.port = port;
        }

        public EndPoint() {
        }

        public String getIp() {
            return ip;
        }

        public void setIp(String ip) {
            this.ip = ip;
        }

        public int getPort() {
            return port;
        }

        public void setPort(int port) {
            this.port = port;
        }

        @Override
        public String toString() {
            return "ip:port=" + ip + ":" + port;
        }
    }

    public QueryDataSet() {
    }

    public QueryDataSet(List<Path> paths, List<TSDataType> dataTypes) {
        initQueryDataSetFields(paths, dataTypes, true);
    }

    public QueryDataSet(List<Path> paths, List<TSDataType> dataTypes, boolean ascending) {
        initQueryDataSetFields(paths, dataTypes, ascending);
    }

    protected void initQueryDataSetFields(
            List<Path> paths, List<TSDataType> dataTypes, boolean ascending) {
        this.paths = paths;
        this.dataTypes = dataTypes;
        this.ascending = ascending;
        this.columnNum = 0;
        if (paths != null) {
            for (Path p : paths) {
                columnNum += p.getColumnNum();
            }
        }
    }

    public Set<Integer> getWithoutNullColumnsIndex() {
        return withoutNullColumnsIndex;
    }

    public void setWithoutNullColumnsIndex(Set<Integer> withoutNullColumnsIndex) {
        this.withoutNullColumnsIndex = withoutNullColumnsIndex;
    }

    public boolean hasNext() throws IOException {
        // proceed to the OFFSET row by skipping rows
        while (rowOffset > 0) {
            if (hasNextWithoutConstraint()) {
                RowRecord rowRecord = nextWithoutConstraint(); // DO NOT use next()
                // filter rows whose columns are null according to the rule
                if (withoutNullFilter(rowRecord)) {
                    continue;
                }
                rowOffset--;
            } else {
                return false;
            }
        }

        // make sure within the LIMIT constraint if exists
        if (rowLimit > 0 && alreadyReturnedRowNum >= rowLimit) {
            return false;
        }

        return hasNextWithoutConstraint();
    }

    /**
     * check rowRecord whether satisfy without null condition
     *
     * @param rowRecord rowRecord
     * @return true satisfy false don't satisfy
     */
    public boolean withoutNullFilter(RowRecord rowRecord) {
        boolean
                anyNullFlag =
                (withoutNullColumnsIndex == null)
                        ? rowRecord.hasNullField()
                        : (withoutNullColumnsIndex.isEmpty() && rowRecord.hasNullField()),
                allNullFlag = (withoutNullColumnsIndex != null) || rowRecord.isAllNull();

        if (withoutNullColumnsIndex != null) {
            for (int index : withoutNullColumnsIndex) {
                Field field = rowRecord.getFields().get(index);
                if (field == null || field.getDataType() == null) {
                    anyNullFlag = true;
                    if (withoutAnyNull) {
                        break;
                    }
                } else {
                    allNullFlag = false;
                    if (withoutAllNull) {
                        break;
                    }
                }
            }
        }

        if (withoutNullColumnsIndex != null && withoutNullColumnsIndex.isEmpty()) {
            allNullFlag = rowRecord.isAllNull();
        }
        return (withoutAllNull && allNullFlag) || (withoutAnyNull && anyNullFlag);
    }

    public abstract boolean hasNextWithoutConstraint() throws IOException;

    /**
     * This method is used for batch query, return RowRecord.
     */
    public RowRecord next() throws IOException {
        if (rowLimit > 0) {
            alreadyReturnedRowNum++;
        }
        return nextWithoutConstraint();
    }

    public void setFetchSize(int fetchSize) {
        this.fetchSize = fetchSize;
    }

    public abstract RowRecord nextWithoutConstraint() throws IOException;

    public List<Path> getPaths() {
        return paths;
    }

    public List<TSDataType> getDataTypes() {
        return dataTypes;
    }

    public void setDataTypes(List<TSDataType> dataTypes) {
        this.dataTypes = dataTypes;
    }

    public int getRowLimit() {
        return rowLimit;
    }

    public void setRowLimit(int rowLimit) {
        this.rowLimit = rowLimit;
    }

    public int getRowOffset() {
        return rowOffset;
    }

    public void setRowOffset(int rowOffset) {
        this.rowOffset = rowOffset;
    }

    public boolean hasLimit() {
        return rowLimit > 0;
    }

    public EndPoint getEndPoint() {
        return endPoint;
    }

    public void setEndPoint(EndPoint endPoint) {
        this.endPoint = endPoint;
    }

    public boolean isWithoutAnyNull() {
        return withoutAnyNull;
    }

    public void setWithoutAnyNull(boolean withoutAnyNull) {
        this.withoutAnyNull = withoutAnyNull;
    }

    public boolean isWithoutAllNull() {
        return withoutAllNull;
    }

    public void setWithoutAllNull(boolean withoutAllNull) {
        this.withoutAllNull = withoutAllNull;
    }

    public void decreaseAlreadyReturnedRowNum() {
        alreadyReturnedRowNum--;
    }

    public int getColumnNum() {
        return columnNum;
    }

    public void setColumnNum(int columnNum) {
        this.columnNum = columnNum;
    }
}
